Skip to content

lib: add new apis to support creating processor unit for input and output #9957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

yunzvanessa
Copy link

@yunzvanessa yunzvanessa commented Feb 19, 2025

Summary

  1. Add 4 new library APIs to support creating and retrieving processors for input/output plugins
  • flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type, const char *processor_unit_name, int ffd, ...): create a processor unit with specific event type and processor unit name for an input plugin.
  • flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type, const char *processor_unit_name, int ffd, ...): create a processor unit with specific event type and processor unit name for an output plugin.
  • flb_input_get_processor(flb_ctx_t *ctx, int ffd, struct flb_processor **proc): retrieve the processor associated with a given input plugin
  • flb_output_get_processor(flb_ctx_t *ctx, int ffd, struct flb_processor **proc): retrieve the processor associated with a given output plugin
  1. This PR also has a fix for processor which updates function signature of flb_processor_unit_create to use const char* for unit_name

Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • [N/A] Example configuration file for the change:

  • Debug log output from testing the change:

# Example output log from an application that uses fluentbit as a C libary API. With the new processor unit APIs I added, I 
# enabled otel_envelop and content_modifier processor units to manipulate the log messages from the tail input plugin, 
# before sending the logs to openetelemetry output plugin

[2025/02/18 11:22:16] [debug] [input:tail:tail.0] inode=8603124, ErrorLog.txt, events: IN_ATTRIB
[2025/02/18 11:22:16] [debug] [input:tail:tail.0] inode=8603164, 8000_ErrorLog.txt, events: IN_ATTRIB
[2025/02/18 11:22:16] [debug] [input:tail:tail.0] inode=8603167, 8001_ErrorLog.txt, events: IN_ATTRIB
[2025/02/18 11:22:16] [debug] [input:tail:tail.0] inode=8594915, 8002_ErrorLog.txt, events: IN_ATTRIB
[2025/02/18 11:22:16] [debug] [input:tail:tail.0] inode=8594955, 7997_ErrorLog.txt, events: IN_ATTRIB
[2025/02/18 11:22:17] [debug] [input:tail:tail.0] inode=8603124, ErrorLog.txt, events: IN_MODIFY
[2025/02/18 11:22:17] [debug] [task] created task=0x7f22ec1853f0 id=1 OK
[2025/02/18 11:22:17] [debug] [output:stdout:stdout.1] task_id=1 assigned to thread #0
[2025/02/18 11:22:17] [debug] [out flush] cb_destroy coro_id=2
[2025/02/18 11:22:17] [ warn] [output:opentelemetry:opentelemetry.0] error performing HTTP request, remote host=remote.host.name:4318 connection error
[2025/02/18 11:22:17] [debug] [upstream] KA connection #65 to remote.host.name:4318 is now available
[2025/02/18 11:22:17] [debug] [upstream] KA connection #65 to remote.host.name:4318 has been disconnected by the remote service
[2025/02/18 11:22:17] [debug] [out flush] cb_destroy coro_id=1
[2025/02/18 11:22:17] [debug] [retry] new retry created for task_id=0 attempts=1
[2025/02/18 11:22:17] [ warn] [engine] failed to flush chunk '30926-1739906536.401558167.flb', retry in 8 seconds: task_id=0, input=tail.0 > output=opentelemetry.0 (out_id=0)
[2025/02/18 11:22:17] [debug] [input:tail:tail.0] inode=8603124, ErrorLog.txt, events: IN_MODIFY
[2025/02/18 11:22:17] [debug] [input:tail:tail.0] inode=8603124, ErrorLog.txt, events: IN_MODIFY
[2025/02/18 11:22:17] [debug] [input:tail:tail.0] inode=8603124, ErrorLog.txt, events: IN_MODIFY
  • Output from running the new unit tests:
./flb-it-processor
Test processor...

[0] TEST: [1744672531.793297403, {}, {"key1"=>12345, "key2"=>"fluent bit"}]
[0] TEST: [1744672531.793297403, {}, {"key1"=>12345, "key2"=>"fluent bit", "hostname"=>"monox"}]
[ OK ]
Test input_processor...                         [ OK ]
Test output_processor...                        [ OK ]
SUCCESS: All unit tests have passed.
  • Attached Valgrind output that shows no leaks or memory corruption was found
$ valgrind bin/flb-it-processor
==3845313== Memcheck, a memory error detector
==3845313== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==3845313== Using Valgrind-3.22.0 and LibVEX; rerun with -h for copyright info
==3845313== Command: bin/flb-it-processor
==3845313==
Test processor...

[0] TEST: [1744675864.751642558, {}, {"key1"=>12345, "key2"=>"fluent bit"}]
[0] TEST: [1744675864.751642558, {}, {"key1"=>12345, "key2"=>"fluent bit", "hostname"=>"monox"}]
==3845313== Warning: invalid file descriptor -1 in syscall close()
[ OK ]
==3845313== Warning: invalid file descriptor -1 in syscall close()
Test input_processor...                         ==3845313== Warning: invalid file descriptor -1 in syscall close()
[ OK ]
==3845313== Warning: invalid file descriptor -1 in syscall close()
Test output_processor...                        ==3845313== Warning: invalid file descriptor -1 in syscall close()
[ OK ]
==3845313== Warning: invalid file descriptor -1 in syscall close()
SUCCESS: All unit tests have passed.
==3845313==
==3845313== HEAP SUMMARY:
==3845313==     in use at exit: 0 bytes in 0 blocks
==3845313==   total heap usage: 5,529 allocs, 5,529 frees, 952,463 bytes allocated
==3845313==
==3845313== All heap blocks were freed -- no leaks are possible
==3845313==
==3845313== For lists of detected and suppressed errors, rerun with: -s
==3845313== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • [N/A] Documentation required for this feature

Backporting

  • [N/A] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@yunzvanessa
Copy link
Author

Hi,

Any updates on this PR?

Thanks!

@edsiper
Copy link
Member

edsiper commented Mar 27, 2025

thanks for this contribution.

To get it merged please adjust the code to the expected coding style and add unit tests:

@yunzvanessa yunzvanessa changed the title Lib: Add new APIs to support creating processor unit for input and output lib: add new APIs to support creating processor unit for input and output Apr 8, 2025
@yunzvanessa yunzvanessa changed the title lib: add new APIs to support creating processor unit for input and output lib: add new apis to support creating processor unit for input and output Apr 8, 2025
This patch introduces new APIs to create a single processor unit for
input and output instances:

1. flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type,
const char *processor_unit_name, int ffd,  …)

Create a single input processor unit for a specific event type and
unit name

2. flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type,
const char *processor_unit_name, int ffd,  …)

Create a single output processor unit for a specific event type
and unit name

Signed-off-by: Vanessa Zhang <[email protected]>
use const char* for unit_name

Signed-off-by: Vanessa Zhang <[email protected]>
Vanessa Zhang added 2 commits April 14, 2025 17:18
flb_input_processor_unit and flb_output_processor_unit

Signed-off-by: Vanessa Zhang <[email protected]>
This patch introduces new apis to retrieve processor associated with

a given input or output plugin:

int flb_input_get_processor(flb_ctx_t *ctx, int ffd,
struct flb_processor **proc)

int flb_output_get_processor(flb_ctx_t *ctx, int ffd,
struct flb_processor **proc)

Signed-off-by: Vanessa Zhang <[email protected]>
@yunzvanessa
Copy link
Author

Hi @edsiper,

I’ve updated this PR to include the required coding style adjustments, new unit tests, and two new library APIs to support processors. Please let me know if there are any other changes you’d like to see. Thank you for reviewing my code!

@yunzvanessa
Copy link
Author

Hi @edsiper,

Any updates on this pull request?

Thanks,
Vanessa

@patrick-stephens
Copy link
Contributor

Triggered a build of all targets to confirm no funny business on any of them - function exporting can go wonky with some of the older targets.

@patrick-stephens
Copy link
Contributor

Can you rebase @yunzvanessa as looks like Github is now blocking runs on Ubuntu 20 runners?

@yunzvanessa
Copy link
Author

Hi @patrick-stephens,

My branch should be up-to-date with master branch now. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs-required ok-package-test Run PR packaging tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants